package mnet

/*
 * SyncSession for request and response synchronously.
 * server send response with objId.
 * user use this SyncSession to Send message and
   return its response synchronously only giving Establish callback.
*/

import (
	"encoding/binary"
	"errors"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

const (
	gRequireArraySize = 16
)

// response message struct.
type ResponseMessage struct {
	MsgId uint16
	Resp  []byte
}

// inside response struct.
type response struct {
	ResponseMessage
	objId uint64
}

// Establish function: isConnect is connected(true) or terminate(false) state.
type SyncEstablishFunc func(s *SyncSession, isConnect bool)

// Message data function: session, msgId and data.
type SyncMessageProcessFunc func(s *SyncSession, msgId uint16, data []byte)

// mutex map struct.
type responseChanMap struct {
	// response map
	respMap map[uint64]chan response

	// mutex for respMap map.
	mu sync.RWMutex
}

// new chan response map
func newResponseChanMap() *responseChanMap {
	return &responseChanMap{
		respMap: make(map[uint64]chan response),
	}
}

// add element
func (rcm *responseChanMap) add(objId uint64, resChan chan response) {
	rcm.mu.Lock()
	defer rcm.mu.Unlock()
	rcm.respMap[objId] = resChan
}

// remove element
func (rcm *responseChanMap) remove(objId uint64) {
	rcm.mu.Lock()
	defer rcm.mu.Unlock()
	delete(rcm.respMap, objId)
}

// get element.
func (rcm *responseChanMap) get(objId uint64) chan response {
	rcm.mu.RLock()
	defer rcm.mu.RUnlock()
	if v, ok := rcm.respMap[objId]; ok {
		return v
	} else {
		return nil
	}
}

// Synchronous Session struct.
type SyncSession struct {
	conn            IConnection                         // net connection.
	state           int32                               // state flag,1 state, 0 else closed.
	reqs            [gRequireArraySize]*responseChanMap // unique object id to response map
	nextObjId       uint64                              // next message unique object id
	EstablishFunc   SyncEstablishFunc                   // establish function.
	MessageFunc     SyncMessageProcessFunc              // message function.
	Attach          interface{}                         // session attach
	pool            *sync.Pool                          // chan response pool
	timeOutDuration time.Duration                       // time out
}

// set time out duration.
func (ss *SyncSession) SetTimeout(t time.Duration) {
	if t > 0 {
		ss.timeOutDuration = t
	}
}

// set establish callback.
func (ss *SyncSession) SetEstablish(esFunc SyncEstablishFunc) {
	ss.EstablishFunc = esFunc
}

// set message callback.
func (ss *SyncSession) SetDealer(dealFunc SyncMessageProcessFunc) {
	ss.MessageFunc = dealFunc
}

// attach info
func (ss *SyncSession) SetAttach(a interface{}) {
	ss.Attach = a
}
func (ss *SyncSession) GetAttach() interface{} {
	return ss.Attach
}

// OnRecv receives data callback.
func (ss *SyncSession) OnRecv(msg []byte) {
	if len(msg) < int(gMsgDataLen)+int(gMsgIdLen)+gObjIdLen {
		log().Error("[net] message len error")
		return
	}
	data := msg[gMsgDataLen:]
	msgId := binary.BigEndian.Uint16(data[0:])

	// build unique object id.
	if objId := binary.BigEndian.Uint64(data[gMsgIdLen:]); objId != 0 {
		// for synchronous mode
		var resp response
		resp.objId = objId
		resp.MsgId = msgId
		resp.Resp = make([]byte, len(data)-gObjIdLen)
		copy(resp.Resp, data[gObjIdLen:])

		// get chan and put it if find it.
		if v := ss.reqs[resp.objId%gRequireArraySize].get(resp.objId); v != nil {
			v <- resp
			ss.pool.Put(v)
			ss.reqs[resp.objId%gRequireArraySize].remove(resp.objId)
		}
	} else {
		// for asynchronous mode.
		if ss.MessageFunc != nil {
			ss.MessageFunc(ss, msgId, data[gObjIdLen:])
		} else {
			log().Error("[net] can not find SyncSession message function")
		}
	}
}

// getNextObjId gets packet unique id
func (ss *SyncSession) getNextObjId() uint64 {
	return atomic.AddUint64(&ss.nextObjId, 1)
}

// Require message and return (ResponseMessage,error) pair.
func (ss *SyncSession) Require(msgId uint16, data []byte) (ResponseMessage, error) {
	if ss.conn == nil {
		return ResponseMessage{}, fmt.Errorf("connection is nil")
	}

	// add objId before data and send
	objId := ss.getNextObjId()
	respChan := ss.pool.Get().(chan response)

	// insert req chan and send req message.
	ss.reqs[objId%gRequireArraySize].add(objId, respChan)

	// send requirement.
	buildBytes := gMyMsgCodec.BuildMessage(msgId, objId, data)
	secTime := int(ss.timeOutDuration / time.Second)
	if secTime <= 0 {
		secTime = 1
	}
	if err := ss.conn.SendMsgTimeout(buildBytes, secTime); err != nil {
		ss.pool.Put(respChan)
		// try to delete from ss.reqs
		ss.reqs[objId%gRequireArraySize].remove(objId)
		return ResponseMessage{}, err
	}

	// just wait for its response.
	timeOut := time.After(ss.timeOutDuration)
	select {
	case resp := <-respChan:
		return resp.ResponseMessage, nil
	case <-timeOut:
		ss.pool.Put(respChan)
		// remove objId info from map.
		ss.reqs[objId%gRequireArraySize].remove(objId)
		return ResponseMessage{}, fmt.Errorf("wait time out")
	}
}

// Send (msgId, data) message asynchronously.
func (ss *SyncSession) Send(msgId uint16, data []byte) error {
	if ss.conn == nil {
		return errors.New("connection is nil")
	}
	buildBytes := gMyMsgCodec.BuildMessage(msgId, 0, data)
	return ss.conn.SendMsg(buildBytes)
}

// Establish connection callback
func (ss *SyncSession) OnEstablish() {
	atomic.StoreInt32(&ss.state, gOpenFlag)
	if ss.EstablishFunc != nil {
		ss.EstablishFunc(ss, true)
	}
}

// Terminate connection callback
func (ss *SyncSession) OnTerminate() {
	atomic.StoreInt32(&ss.state, gClosedFlag)
	if ss.EstablishFunc != nil {
		ss.EstablishFunc(ss, false)
	}
}

// Set IConnection
func (ss *SyncSession) SetConnection(conn IConnection) {
	ss.conn = conn
}

// Whether it is connected.
func (ss *SyncSession) IsConnected() bool {
	return atomic.LoadInt32(&ss.state) == gOpenFlag
}

// Get net connection.
func (ss *SyncSession) GetConn() IConnection {
	return ss.conn
}

// init response chan size.
func (ss *SyncSession) init() {
	for i := 0; i < gRequireArraySize; i++ {
		ss.reqs[i] = newResponseChanMap()
	}
}

// Release session.
func (ss *SyncSession) Release() {
}
